package org.zjvis.datascience.service.cleanup;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zjvis.datascience.common.dto.TaskInstanceDTO;
import org.zjvis.datascience.service.TaskInstanceService;
import org.zjvis.datascience.service.dataprovider.GPDataProvider;

import java.util.List;

/**
 * @description 清洗操作线程
 * @date 2021-10-15
 */
public class CleanUpRunner implements Runnable {
    private final static Logger logger = LoggerFactory.getLogger("CleanUpRunner");

    //有冗余表的taskId
    private Long taskId;

    private TaskInstanceService taskInstanceService;

    private GPDataProvider gpDataProvider;

    public CleanUpRunner(Long taskId,
                         TaskInstanceService taskInstanceService,
                         GPDataProvider gpDataProvider) {
        this.taskId = taskId;
        this.taskInstanceService = taskInstanceService;
        this.gpDataProvider = gpDataProvider;
    }

    @Override
    public void run() {
        //查询出待删除taskId状态为成功的sql_text,log_info
        List<TaskInstanceDTO> taskInstanceDTOS = taskInstanceService
                .queryRedundantTableByTaskId(taskId, CleanUpTemporaryTableScheduler.getLimitDate());
        boolean isView = false;
        StringBuilder tables = new StringBuilder();
        for (int i = CleanUpTemporaryTableScheduler.RETAIN_NUM; i < taskInstanceDTOS.size(); i++) {
            TaskInstanceDTO taskInstanceDTO = taskInstanceDTOS.get(i);
            //待删除的冗余视图
            if (StringUtils.isEmpty(taskInstanceDTO.getLogInfo())) {
                String sqlText = taskInstanceDTO.getSqlText();
                if (StringUtils.isEmpty(sqlText)) {
                    logger.error(String.format("cleanup view sqlText is empty or sqlText is not view.It maybe a isolated task, taskId=%s id=%s", taskId, taskInstanceDTO.getId()));
                    continue;
                }
                if (sqlText.toUpperCase().startsWith("CREATE VIEW")) {
                    isView = true;
                    String tableName = sqlText.split(" ")[2];
                    tables.append(tableName).append(",");
                }
                if (sqlText.toUpperCase().startsWith("CREATE TABLE")) {
                    isView = false;
                    String tableName = sqlText.split(" ")[2];
                    tables.append(tableName).append(",");
                }
            } else {//待删除的冗余表
                String logInfo = taskInstanceDTO.getLogInfo();
                JSONObject jsonObject = JSONObject.parseObject(logInfo);
                JSONObject result = jsonObject.getJSONObject("result");
                if (!result.containsKey("output_params")) {
                    logger.error(String.format("cleanup table output_params is not exist!!! taskId=%s id=%s", taskId, taskInstanceDTO.getId()));
                    continue;
                }
                JSONArray outputs = result.getJSONArray("output_params");
                for (int j = 0; j < outputs.size(); j++) {
                    JSONObject obj = outputs.getJSONObject(j);
                    String tableName = obj.getString("out_table_name");
                    tables.append(tableName).append(",");
                }
            }
        }
        //删除表或视图
        if (StringUtils.isNotEmpty(tables)) {
            gpDataProvider
                    .dropRedundantTables(tables.deleteCharAt(tables.length() - 1).toString(), isView);
            logger.info(String.format("CleanUpRunner cleanup task finish. taskId=%s tables=%s", taskId, tables.toString()));
        }
    }

}
